package cn.liwq.demo.mq.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * <p>Project: demo-common </p>
 * <p>Function:  </p>
 * <p>Description:  </p>
 * <p>Copyright: Copyright(c) 2018 Ucarinc </p>
 * <p>Company: Ucarinc </p>
 *
 * @author weiqiang.li@ucarinc.com
 * @version 1.0
 * @date 2018-08-09 15:45:11
 */
public class ConsumerDemo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try {
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            consumer.subscribe(Arrays.asList("liwq-r-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(500);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d value = %s \r\n", record.offset(), record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
